Arbiter这个类是master进程工作的核心,主要的工作就是通过监听信号事件来维护worker进程的数量,以及通过SIGHUP/USR2等信号对应用进行热更新或者在线升级。
Arbiter类的外部调用只有Arbiter(self).run()这一句,也就是init和run两个方法,这里的self是一个WSGIApplication的实例。
Arbiter比较重要的方法有下列几个:
- setup, start, init: 初始化以及导入app配置的几个方法,例如worker类型和数量,监听地址等
init_signals, signal: 关于信号处理的方法,接收到某个信号后会自动触发相应的处理方法 - handle_xxx: 一系列以handle_作为前缀的方法,是各个信号对应的实际的处理方法
- kill_worker, kill_workers: 杀死worker进程
- spawn_worker, spawn_workers: 创建worker进程
- run: 主循环, Arbiter的核心,通过不断循环来监听信号
- sleep, wakeup: 休眠以及唤醒主循环
用一句话来概括Arbiter的工作其实就是,有一个主循环不断监听不同的信号事件,有信号的话调用相应的处理函数,以此来维护worker的数量。所以,主要就是这么个工作。下面来具体看一下几个方法:
init, setup 方法
def __init__(self, app):
os.environ["SERVER_SOFTWARE"] = SERVER_SOFTWARE
self._num_workers = None
self._last_logged_active_worker_count = None
self.log = None
self.setup(app)
self.pidfile = None
self.systemd = False
self.worker_age = 0
self.reexec_pid = 0
self.master_pid = 0
self.master_name = "Master"
cwd = util.getcwd()
args = sys.argv[:]
args.insert(0, sys.executable)
# init start context
self.START_CTX = {
"args": args,
"cwd": cwd,
0: sys.executable
}
def setup(self, app):
self.app = app
self.cfg = app.cfg
if self.log is None:
self.log = self.cfg.logger_class(app.cfg)
# reopen files
if 'GUNICORN_FD' in os.environ:
self.log.reopen_files()
self.worker_class = self.cfg.worker_class # worker类型
self.address = self.cfg.address # 监听地址
self.num_workers = self.cfg.workers # worker数量
self.timeout = self.cfg.timeout
self.proc_name = self.cfg.proc_name # master进程名
self.log.debug('Current configuration:\n{0}'.format(
'\n'.join(
' {0}: {1}'.format(config, value.value)
for config, value
in sorted(self.cfg.settings.items(),
key=lambda setting: setting[1]))))
# set enviroment' variables
if self.cfg.env:
for k, v in self.cfg.env.items():
os.environ[k] = v
if self.cfg.preload_app: # 预先加载app
self.app.wsgi()
可以看到主要是对一些属性赋值,进行初始化, setup方法获取到app的worker类型,配置worker数量,监听地址以及日志配置等
run方法
def run(self):
"Main master loop."
self.start()
util._setproctitle("master [%s]" % self.proc_name) # 设置master进程名
try:
self.manage_workers() # 调控worker数量
while True:
self.maybe_promote_master()
sig = self.SIG_QUEUE.pop(0) if self.SIG_QUEUE else None # 从信号队列里面取出最早的一个信号
if sig is None:
self.sleep() # 进入睡眠或者立即
self.murder_workers()
self.manage_workers()
continue
if sig not in self.SIG_NAMES:
self.log.info("Ignoring unknown signal: %s", sig)
continue
signame = self.SIG_NAMES.get(sig)
handler = getattr(self, "handle_%s" % signame, None)
if not handler:
self.log.error("Unhandled signal: %s", signame)
continue
self.log.info("Handling signal: %s", signame)
handler()
self.wakeup()
except StopIteration:
self.halt()
except KeyboardInterrupt:
self.halt()
except HaltServer as inst:
self.halt(reason=inst.reason, exit_status=inst.exit_status)
except SystemExit:
raise
except Exception:
self.log.info("Unhandled exception in main loop",
exc_info=True)
self.stop(False)
if self.pidfile is not None:
self.pidfile.unlink()
sys.exit(-1)
def start(self):
"""\
Initialize the arbiter. Start listening and set pidfile if needed.
"""
self.log.info("Starting gunicorn %s", __version__)
if 'GUNICORN_PID' in os.environ:
self.master_pid = int(os.environ.get('GUNICORN_PID'))
self.proc_name = self.proc_name + ".2"
self.master_name = "Master.2"
self.pid = os.getpid()
if self.cfg.pidfile is not None:
pidname = self.cfg.pidfile
if self.master_pid != 0:
pidname += ".2"
self.pidfile = Pidfile(pidname)
self.pidfile.create(self.pid)
self.cfg.on_starting(self)
self.init_signals()
if not self.LISTENERS:
fds = None
listen_fds = systemd.listen_fds()
if listen_fds:
self.systemd = True
fds = range(systemd.SD_LISTEN_FDS_START,
systemd.SD_LISTEN_FDS_START + listen_fds)
elif self.master_pid:
fds = []
for fd in os.environ.pop('GUNICORN_FD').split(','):
fds.append(int(fd))
self.LISTENERS = sock.create_sockets(self.cfg, self.log, fds)
listeners_str = ",".join([str(l) for l in self.LISTENERS])
self.log.debug("Arbiter booted")
self.log.info("Listening at: %s (%s)", listeners_str, self.pid)
self.log.info("Using worker: %s", self.cfg.worker_class_str)
# check worker class requirements
if hasattr(self.worker_class, "check_config"):
self.worker_class.check_config(self.cfg, self.log)
self.cfg.when_ready(self)
首先调用了start方法,start方法处理创建pid文件以及创建socket列表,并调用init_signals注册信号事件。master和worker之间通过信号来通信,
def init_signals(self):
"""\
Initialize master signal handling. Most of the signals
are queued. Child signals only wake up the master.
"""
# close old PIPE
for p in self.PIPE:
os.close(p)
# initialize the pipe
self.PIPE = pair = os.pipe()
for p in pair:
util.set_non_blocking(p)
util.close_on_exec(p)
self.log.close_on_exec()
# initialize all signals
for s in self.SIGNALS:
signal.signal(s, self.signal)
signal.signal(signal.SIGCHLD, self.handle_chld)
def signal(self, sig, frame):
if len(self.SIG_QUEUE) < 5:
self.SIG_QUEUE.append(sig)
self.wakeup()
def wakeup(self):
"""\
Wake up the arbiter by writing to the PIPE
"""
try:
os.write(self.PIPE[1], b'.')
except IOError as e:
if e.errno not in [errno.EAGAIN, errno.EINTR]:
raise
可以看到除了SIGCHLD信号外,其他信号的handler都是signal方法,也就是接收到这些信号之后都会调用signal方法来处理,
signal方法的工作就是将信号加入队列,然后调用wakeup方法,向 self.PIPE 写入数据用于唤醒.
run方法在调用start方法之后调用了manage_workers方法,
def manage_workers(self):
"""\
Maintain the number of workers by spawning or killing
as required.
"""
if len(self.WORKERS.keys()) < self.num_workers:
self.spawn_workers()
workers = self.WORKERS.items()
workers = sorted(workers, key=lambda w: w[1].age)
while len(workers) > self.num_workers:
(pid, _) = workers.pop(0)
self.kill_worker(pid, signal.SIGTERM)
active_worker_count = len(workers)
if self._last_logged_active_worker_count != active_worker_count:
self._last_logged_active_worker_count = active_worker_count
self.log.debug("{0} workers".format(active_worker_count),
extra={"metric": "gunicorn.workers",
"value": active_worker_count,
"mtype": "gauge"})
self.WORKERS是保存当前运行的worker的字典,self.num_workers是想要维持的worker数量,通过对比当前运行的worker数量和num_workers, 少了就调用spawn_workers方法来创建,多了就调用kill_worker方法来杀死,这样就维持了num_workers数量的worker。
在调用manage_workers方法之后,就是一个while的无限循环,检查信号队列中有没有未处理的信号,如果没有信号,调用sleep方法,sleep方法检测管道是否可读,可读说明接收到信号了会调用signal方法往信号队列里添加, 继续调用manage_workers来维持worker数量;如果有信号则找到其handler, handler方法以handle_为前缀,信号名为后缀,方便直接通过信号名来找到,找到handler然后运行,然后继续等待。
spawn_worker
def spawn_worker(self):
self.worker_age += 1
worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS,
self.app, self.timeout / 2.0,
self.cfg, self.log)
self.cfg.pre_fork(self, worker)
pid = os.fork()
if pid != 0:
worker.pid = pid
self.WORKERS[pid] = worker
return pid
# Do not inherit the temporary files of other workers
for sibling in self.WORKERS.values():
sibling.tmp.close()
# Process Child
worker.pid = os.getpid()
try:
util._setproctitle("worker [%s]" % self.proc_name)
self.log.info("Booting worker with pid: %s", worker.pid)
self.cfg.post_fork(self, worker)
worker.init_process()
sys.exit(0)
except SystemExit:
raise
except AppImportError as e:
self.log.debug("Exception while loading the application",
exc_info=True)
print("%s" % e, file=sys.stderr)
sys.stderr.flush()
sys.exit(self.APP_LOAD_ERROR)
except:
self.log.exception("Exception in worker process")
if not worker.booted:
sys.exit(self.WORKER_BOOT_ERROR)
sys.exit(-1)
finally:
self.log.info("Worker exiting (pid: %s)", worker.pid)
try:
worker.tmp.close()
self.cfg.worker_exit(self, worker)
except:
self.log.warning("Exception during worker exit:\n%s",
traceback.format_exc())
manage_workers中需要增加worker调用的是spawn_workers, 然后spawn_workers通过调用spawn_worker来创建worker。
wakeup 和 sleep 方法
主进程接收信号并处理的机制中,有几次出现了self.PIPE这个管道,我一直不太清楚具体是如何运作的,在sleep方法和wakeup方法里使用了self.PIPE,看到gunicorn源码阅读笔记这篇文章里的方法,我也将run方法中的sleep那一行注释了,并且加了一行print(time.time()),然后运行,果然发现了一些端倪,while循环执行的非常快,使用top命令可以看到该进程CPU占用一直维持在90%以上,甚至95%以上,于是便明白了这是减少master进程的资源消耗
def sleep(self):
“””\
Sleep until PIPE is readable or we timeout.
A readable PIPE means a signal occurred.
“””
try:
ready = select.select([self.PIPE[0]], [], [], 1.0) # 要么PIPE可读立即返回,要么等待超时
if not ready[0]:
return
while os.read(self.PIPE[0], 1):
pass
except (select.error, OSError) as e:
# TODO: select.error is a subclass of OSError since Python 3.3.
error_number = getattr(e, 'errno', e.args[0])
if error_number not in [errno.EAGAIN, errno.EINTR]:
raise
except KeyboardInterrupt:
sys.exit()
def wakeup(self):
"""\
Wake up the arbiter by writing to the PIPE
"""
try:
os.write(self.PIPE[1], b'.')
except IOError as e:
if e.errno not in [errno.EAGAIN, errno.EINTR]:
raise
注意sleep方法会调用select函数来检测管道是否可读,select函数的最后一个参数1.0时timeout的值,也就是select会花1秒钟来检测这个管道是否可读,超过1秒就返回,就是这个1秒钟的timeout,保证了没有信号时while循环不会循环的太快以至于CPU占用过高,在把sleep这个方法取消注释并运行后,发现进程CPU占用立即降到了0.1%左右,证实了我们的猜想。由此可以清楚的知道这个管道的作用了,当有信号到达时,往信号队列里面添加该信号,之后会调用wakeup方法,wakeup方法会往管道里写入数据,主循环在执行sleep的时候,select可以读取到管道中有数据而立即返回,不用再等待1秒的timeout。
sleep让master进程每一轮循环不要那么快,没信号的时候睡眠状态,减少资源消耗。
wakeup和sleep配合,master进程处睡眠的状态,当有信号时往管道里写数据,select检测到管道可读立即返回,不用继续休眠,随后在下一轮循环中从信号队列中取出信号进行处理。
最后来总结一下Arbiter的主要流程:
- 根据传入app的配置进行初始化,例如worker的数量,worker的类型,注册信号处理函数等
- 开始主循环,调用manage_workers维护一定数量的worker
- 主循环从检测信号队列及管道,无信号则睡眠,有信号则立即唤醒并在下一个循环中处理信号
参考:
https://meta.tn/a/3e8eb2aa5bf4febad39b6138a873637b5b8f8696b0f92dc808df9a0357d5084e
http://rur.logdown.com/posts/2015/04/03/259224